package com.rtsapp.server.network.client;

import com.rtsapp.server.logger.Logger;
import com.rtsapp.server.logger.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * Created by admin on 16-7-19.
 */
public class NettyClient implements IMsgSender{

    private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
    private static final long RECONN_INTERVAL_MILLIS = 1000;


    private final SocketAddress remotePeer;
    private final EventLoopGroup eventLoop;
    private final int maxFrameSize; // 40960
    private final ChannelInboundHandler inHandler;
    private final ChannelOutboundHandler outHandler;

    private volatile NettyClientHandler socketHandler = null;
    private volatile Channel socketChannel = null;
    private volatile boolean socketOpen = false;


    public NettyClient( EventLoopGroup eventLoop, String ip, int port , int maxFrameSize,  ChannelInboundHandler inHandler, ChannelOutboundHandler outHandler) {
        this.eventLoop = eventLoop;
        this.remotePeer = new InetSocketAddress(ip, port);
        this.maxFrameSize = maxFrameSize;
        this.inHandler = inHandler;
        this.outHandler = outHandler;
    }




    @Override
    public void send(Object msg) {
        Channel channel = socketChannel;
        if( channel != null ){
            channel.writeAndFlush( msg );
        }
    }


    /**
     * 发起一个Socket重连
     */
    public void connect( ){
        doConnect(remotePeer, 0);
    }

    public void closeAndReconnect( NettyClientHandler handler ){
        synchronized ( this ){
            if( this.socketHandler == handler ){
                this.socketHandler = null;

                setStateClose();

                reconnect();
            }
        }
    }

    private void setStateClose(){
        synchronized (this) {
            socketOpen = false;
            socketChannel = null;
        }
    }


    private void reconnect(){
        doConnect(remotePeer, RECONN_INTERVAL_MILLIS);
    }


    /**
     * 在事件循环中执行连接
     * @param remotePeer
     * @param delay
     */
    private void doConnect(   final SocketAddress remotePeer, long delay ){

        synchronized (this) {
            if ( socketOpen) {
                return;
            }

            socketOpen = true;

            this.eventLoop.schedule(new Runnable() {

                @Override
                public void run() {
                    doConnectTask(remotePeer, true);
                }

            }, delay, TimeUnit.MILLISECONDS);

        }
    }

    /**
     * 直接连接
     * @param remotePeer
     */
    private void doConnectTask(final SocketAddress remotePeer, boolean isReconnect ) {
        try {
            Bootstrap b = new Bootstrap();


            b.group( eventLoop )
                    .channel(NioSocketChannel.class)
                    .handler( new NettyClientInitializer(  ) );

            ChannelFuture channelFuture = b.connect( remotePeer );
            channelFuture.addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(final ChannelFuture channelFuture) throws Exception {
                    if ( channelFuture.isSuccess()) {
                        LOG.info("Successfully connect to remote server. remote peer={}", remotePeer);

                        socketChannel = channelFuture.channel();
                        socketHandler =  channelFuture.channel().pipeline().get( NettyClientHandler.class );


                    } else {
                        LOG.info( "Can't connect to remote server. remote peer={}", remotePeer.toString() );

                        setStateClose();

                        if (isReconnect) {
                            reconnect();
                        }
                    }
                }
            });


        } catch (Exception e) {
            LOG.error("doConnect got exception" , e);

            setStateClose();

            if( isReconnect ) {
                reconnect();
            }
        }
    }



    private  class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast( new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4) );
            ch.pipeline().addLast( new NettyClientHandler( NettyClient.this, inHandler) );
            ch.pipeline().addLast( outHandler );
        }


        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOG.error("CommandCaseChannelInitializer exceptionCaught ", cause);
            super.exceptionCaught(ctx, cause );
            ctx.close();
        }


    }
}
